package org.adam2.sariel.job;

import org.adam2.sariel.Application;
import org.adam2.sariel.factory.ServiceFactory;
import org.adam2.sariel.model.Board;
import org.adam2.sariel.service.BoardService;
import org.adam2.sariel.service.impl.BoardServiceImpl;
import org.adam2.sariel.util.IdUtil;
import org.adam2.sariel.util.MapReduceUtil;
import org.adam2.sariel.util.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Service
public class LoadBoardJob extends Configured implements Tool {

    private static final String HDFS_URL="hdfs://centos:9000";

    public static class LoadBoardMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable ints = new IntWritable(1);
        private Text keyword = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            List<String> columnList= MapReduceUtil.getColumnList(key, value);
            // columnList为null时，表示是文本的第一行（第一行是每一列的列名）
            if(null==columnList){
                return;
            }

//            // 一行记录中只有股票名称和板块名称是中文，其前面的字段绝对不是中文。并且股票名称前面的字段是股票代码，其长度为8。
//            for(int i=0; i<columnList.size(); i++){
//                if(StringUtil.containChinese(columnList.get(i)) && i!=0 && !StringUtil.containChinese(columnList.get(i-1)) && columnList.get(i-1).length()!=8){
//                    keyword.set(columnList.get(i));
//                }
//            }
            keyword.set(columnList.get(7));
            context.write(keyword, ints);
        }
    }

    public static class LoadBoardReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        /**
         * BoardService服务类
         */
        private BoardService boardService = (BoardService) ServiceFactory.getInstance(BoardServiceImpl.class);

        /**
         * 用来存储板块的列表
         */
        private List<Board> boardList=new ArrayList<Board>();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : value) {
                sum += val.get();
            }

            Board board=new Board();
            board.setId(IdUtil.generate());
            // 注意：key是Test类型的对象，将它存储到hbase中时会自动补全长度，即如果key是“123”，
            // 则存入hbase是“123\x00\x00\x00”，因此必须使用key.toString().getBytes()，而不能是key.getBytes()
            board.setName(new String(key.toString().getBytes(), "utf-8"));
            board.setAmount(Long.valueOf(sum));
            boardList.add(board);
        }

        /**
         * 每次初始化LoadBoardReducer时，先清空boardList列表
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if(null!=boardList && boardList.size()!=0){
                boardList.clear();
            }
        }

        /**
         * 在reduce结束时，批量添加Board对象
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if(null!=boardList && boardList.size()>0){
                List<Mutation> mutationList = new ArrayList<Mutation>();
                for(Board board : boardList){

                    Put p = new Put(Bytes.toBytes(board.getId().toString()));
                    // 注意：Bytes.toBytes的参数必须是String类型，否则在HBase中会以16进制显示
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(board.getName().toString()));
                    p.add(Bytes.toBytes("info"), Bytes.toBytes("amount"), Bytes.toBytes(board.getAmount().toString()));
                    mutationList.add(p);
                }
                boardService.batchPut(mutationList);
            }
        }
    }

    public int run(String[] args) throws Exception {
        //获取配置文件：
        Configuration conf = super.getConf();

        // 删除hdfs的output路径
        MapReduceUtil.deleteOutput(conf, HDFS_URL, args[1]);

        //创建job：
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(Application.class);

        //配置作业：
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        //Map  设置Mapper类，设置Mapper类输出的Key、Value的类型：
        job.setMapperClass(LoadBoardJob.LoadBoardMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //Reduce  设置Reducer类， 设置最终输出的 Key、Value的类型（setOutputKeyClass、setOutputValueClass）：
        job.setReducerClass(LoadBoardJob.LoadBoardReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //Output 设置输出路径
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        //提交任务
        boolean isSucess = job.waitForCompletion(true);
        return isSucess ? 1 : 0;     //成功返回1 ，失败返回0
    }
}
